MapReduce:Map端Join算法实现

您所在的位置:网站首页 mapreduce map MapReduce:Map端Join算法实现

MapReduce:Map端Join算法实现

#MapReduce:Map端Join算法实现| 来源: 网络整理| 查看: 265

Map端Join算法实现

1.原理阐述: 适用于关联表中有小表的情形; 可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度 2.实现示例 –先在mapper类中预先定义好小表,进行join –引入实际场景中的解决方案:一次加载数据库或者用

资源文件

orders.txt---->放在本地电脑中 pdts.txt---->放在集群上 orders.txt

1001,20150710,p0001,2 1002,20150710,p0002,3 1002,20150710,p0003,3

pdts.txt

p0001,xiaomi,1000,2 p0002,appale,1000,3 p0003,samsung,1000,4 定义MapJoinMap package com.czxy.demo04; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; public class MapJoinMap extends Mapper { String line=null; HashMap map=new HashMap(); /** * * 用于程序的初始化 * * 只有在程序启动的时候才会执行 * 而且这个代码块只执行一次 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { //获取缓存下来的数据 URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration()); FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration()); FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0])); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open)); //p0001,xiaomi,1000,2 while ((line = bufferedReader.readLine())!=null){ String[] split = line.split(","); System.out.println(split[0]); map.put(split[0],line); //key:p0001 //value:p0001,xiaomi,1000,2 } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // value 就是 1001,2015010,p0001,2 String[] orderSplit = value.toString().split(",");//1001 2015010 p0001 2 String s = map.get(orderSplit[2]);//p0001,xiaomi,1000,2 context.write(new Text(s),new Text(orderSplit[0]+"\t"+orderSplit[1]+"\t"+orderSplit[3])); } } 定义MapJionDriver package com.czxy.demo04; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.net.URI; public class MapJionDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Configuration cf = new Configuration(); //将小数据添加到分布式缓存 DistributedCache.addCacheFile(new URI("hdfs://192.168.100.100:8020/aaaaa/pdts.txt"),cf); FileSystem fs = FileSystem.get(cf); Job JD = Job.getInstance(cf, "MapJionDriver"); JD.setJarByClass(MapJionDriver.class); JD.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(JD,new Path("E:\\传智专修学院大数据课程\\第一学期\\作业\\MapReduce阶段作业\\MapReduceMap端Join")); JD.setMapperClass(MapJoinMap.class); JD.setMapOutputKeyClass(Text.class); JD.setMapOutputValueClass(Text.class); JD.setOutputFormatClass(TextOutputFormat.class); boolean exists = fs.exists(new Path("E:\\传智专修学院大数据课程\\第一学期\\作业\\MapReduce阶段作业\\MapReduceMap端Join\\out")); if (exists){ fs.delete(new Path("E:\\传智专修学院大数据课程\\第一学期\\作业\\MapReduce阶段作业\\MapReduceMap端Join\\out")); } TextOutputFormat.setOutputPath(JD,new Path("E:\\传智专修学院大数据课程\\第一学期\\作业\\MapReduce阶段作业\\MapReduceMap端Join\\out")); return JD.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { ToolRunner.run(new MapJionDriver(),args); } } 结果

在这里插入图片描述



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3